Skip to content

feat(go): add go desrialization support via io streams#3374

Merged
chaokunyang merged 40 commits intoapache:mainfrom
ayush00git:feat/go-deserialization
Mar 6, 2026
Merged

feat(go): add go desrialization support via io streams#3374
chaokunyang merged 40 commits intoapache:mainfrom
ayush00git:feat/go-deserialization

Conversation

@ayush00git
Copy link
Contributor

@ayush00git ayush00git commented Feb 20, 2026

Why?

To enable stream-based deserialization in Fory's Go library, allowing for direct reading from io.Reader without pre-buffering the entire payload. This improves efficiency for network and file-based transport and brings the Go implementation into feature-parity with the python and C++ libraries.

What does this PR do?

1. Stream Infrastructure in go/fory/buffer.go

Enhanced ByteBuffer to support io.Reader with an internal sliding window and automatic filling.

  • Added reader io.Reader and minCap int fields.
  • Implemented fill(n int, err *Error) bool for on-demand data fetching and buffer compaction.
  • Added CheckReadable(n) and Skip(n) memory-safe routines that pull from the underlying stream when necessary to avoid out-of-bounds panics.
  • Updated ReadBinary and ReadBytes to safely copy slices when streaming to prevent silent data corruption on compaction.
  • Updated all Read* methods (fixed-size, varint, tagged) to fetch data from the reader safely if not cached.

2. Stateful InputStream in go/fory/stream.go

Added the InputStream feature to support true, stateful sequential stream reads.

  • Introduced InputStream which persists the buffered byte window and TypeResolver metadata (Meta Sharing) across multiple object decodes on the same stream, decoupled from Fory to mirror the C++ ForyInputStream implementation.
  • Added fory.DeserializeFromStream(is, target) method to process continuous streamed data.
  • Added Shrink() method to compact the internal buffer and reclaim memory during long-lived streams.
  • Added DeserializeFromReader method as an API for simple one-off stream object reads.

3. Stream-Safe Deserialization Paths

Updated internal deserialization pipelines in struct.go and type_def.go to be stream-safe:

  • Integrated CheckReadable bounds-checking into the struct.go fast paths for fixed-size primitives.
  • Safely rewrote schema-evolution skips (skipTypeDef) in type_def.go to use bounds-checked Skip() rather than unbounded readerIndex overrides.

4. Comprehensive Stream Tests

  • Built a custom oneByteReader wrapper (go/fory/test_helper_test.go) that artificially feeds the deserialization engine exactly 1 byte at a time.
  • Migrated the global test suite (struct_test.go, primitive_test.go, slice_primitive_test.go, etc.) to run all standard tests through this aggressive 1-byte fragmented stream reader via a new testDeserialize helper to guarantee total stream robustness.

Related issues

Closes #3302

Does this PR introduce any user-facing change?

  • Does this PR introduce any public API change? (NewInputStream, DeserializeFromStream, DeserializeFromReader, NewByteBufferFromReader)
  • Does this PR introduce any binary protocol compatibility change?

Benchmark

Main branch -
image

This branch -
image

@ayush00git ayush00git changed the title feat(go): add go desrialization support via transport streams feat(go): add go desrialization support via io streams Feb 20, 2026
@ayush00git
Copy link
Contributor Author

Hey @chaokunyang
Have a review and let me know the changes

@Zakir032002
Copy link

hey @ayush00git, looked through this and the main issue i see is in DeserializeFromReader
it calls ResetWithReader at the start of every call:

func (f *Fory) DeserializeFromReader(r io.Reader, v any) error {
    defer f.resetReadState()
    f.readCtx.buffer.ResetWithReader(r, 0) // this wipes the prefetch window every time

so if fill() reads ahead past the first object boundary (which it will), those bytes
are gone on the next call. sequential decode from one stream is broken:

for {
    var msg Msg
    f.DeserializeFromReader(conn, &msg) // bytes after first object get thrown away
}

if you look at how he handles this for c++/python — the Buffer is constructed
from the stream once and passed to each deserialize call directly. the buffer holds
state across calls, it's never reset between objects. the python test
test_stream_deserialize_multiple_objects_from_single_stream shows this exactly —
same reader buffer passed to multiple fory.deserialize() calls.

the go version probably needs something similar — a stream reader type that owns the
buffer and gets reused across deserializations rather than resetting on each call.

Happy to discuss if I'm misreading the flow here

@ayush00git
Copy link
Contributor Author

Hiii @Zakir032002
Thanks for noticing this, exactly this is a bug in the implementation from my side. yes the call would clear any prefetched data from the ByteBuffer making the sequential reads from the stream impossible, also it was clearing the typemetadata as well. thanks for mentioning this, i'll look at the c++ python implementation to correct the deserializer.

@Zakir032002
Copy link

hey @ayush00git , one more thing — ReadBinary and ReadBytes return a direct slice into
b.data:

v := b.data[b.readerIndex : b.readerIndex+length]
return v

the problem is fill() compacts the buffer in-place:

copy(b.data, b.data[b.readerIndex:])

so if someone reads a []byte field and holds onto that slice, then the next
read triggers a fill() — the compaction just overwrote the bytes they're
still holding. no error, no panic, just wrong data.

in stream mode you probably want to copy before returning instead of aliasing:

if b.reader != nil {
    result := make([]byte, length)
    copy(result, b.data[b.readerIndex:b.readerIndex+length])
    b.readerIndex += length
    return result
}

in-memory path stays as is.

@Zakir032002
Copy link

also noticed — ReadVarUint32Small7 only does fill(1) for the first byte, but if that byte has 0x80 set it falls through to continueReadVarUint32 which isn't touched in this PR. so in stream mode, if a multi-byte varint straddles a chunk boundary, the continuation bytes may not be in the buffer yet — you either get a BufferOutOfBoundError or silently read the wrong bytes depending on what's sitting at that position in the buffer.

easiest fix is probably just routing the multi-byte case through readVarUint32Slow since that's already stream-aware after your changes. or adding fill(1) guards inside continueReadVarUint32 directly, either works.

Happy to discuss if I'm misreading the flow here

@ayush00git
Copy link
Contributor Author

Hey @Zakir032002
Sorry i'm a bit busy with my exams, as i get free, i'll review the comments

@ayush00git
Copy link
Contributor Author

Hii @Zakir032002
Thanks for pointing out the flows,

  • The ReadFromDeserializer and returning a direct slice into the data stream are wrongly implemented by me, thanks for suggesting the chnages to fix them as well.

But i think you misunderstood the ReadVarUint32Small7. We already have a check condition -

if len(b.data)-readIdx >= 5 {

}

If we are near a chunk boundary (less than 5 bytes remaining in the buffer), the execution completely skips continueReadVarUint32 and jumps straight to readVaruint36Slow. I don't think this part need any changes

@ayush00git
Copy link
Contributor Author

I've added the StreamReader which now creates a copy slice during desrialization to preserve the data between sequential desrialization calls. DeserializeFromReader only is there if the user wants to deserialize a single struct and don't want a stream overhead for that.

@ayush00git
Copy link
Contributor Author

@chaokunyang
the api design now matches with c++, is there any other modification ?

@ayush00git ayush00git requested a review from chaokunyang March 3, 2026 17:00
@chaokunyang
Copy link
Collaborator

I added shrink_input_stream in #3453, could you add similiar feature in this PR? And I renamed StreamReader to InputStream in #3449, please also rename related API too. the StreamReader is not that clear

copy(b.data, b.data[b.readerIndex:])
b.writerIndex -= b.readerIndex
b.readerIndex = 0
b.data = b.data[:b.writerIndex]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to process on writerIndex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're using a sliding window approach to push the unread bytes to the front, so if we only put readerIndex = 0 and keep the writerIndex at its old position, it would track garbage value left in between.

if b.readerIndex+8 > len(b.data) {
*err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data))
return 0
if !b.fill(8, nil) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you pass nil, this will just slicence the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad. i'll fix it up

//go:inline
func (b *ByteBuffer) ReadVaruint36Small(err *Error) uint64 {
if b.remaining() >= 8 {
if b.remaining() >= 8 || (b.reader != nil && b.fill(8, nil)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a network stream ,and write don't close teh stream, then the stream EOF never come, this will just hang forever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we should never hang it on a pre-fill 8 bytes. let me fix it.

@chaokunyang
Copy link
Collaborator

Some Design suggestions:

  • Keep fast-path varint checks purely remaining >= N; do not call fill(N) in the predicate.
  • Never pass nil error sinks to fill in decode methods that promise bounds errors.
  • Make DeserializeFromReader either strictly stateless (always reset buffer) or explicitly stateful and align it with InputStream semantics.
  • Remove InputStream.reader unless it is actually needed (or use it for invariants/validation).

@chaokunyang
Copy link
Collaborator

And please run benchmarks/go for current branch and pache/main branch, and paste the result here

@ayush00git
Copy link
Contributor Author

Benchmark: apache/fory/main vs this branch (feat/go-deserialization)

apache/fory/main:

goos: linux
goarch: amd64
cpu: 12th Gen Intel(R) Core(TM) i5-1235U
BenchmarkFory_Struct_Serialize-12              34537162     110.4 ns/op      0 B/op    0 allocs/op
BenchmarkFory_Struct_Deserialize-12            19927696     179.6 ns/op     32 B/op    1 allocs/op
BenchmarkFory_StructList_Serialize-12           3048019    1176 ns/op        0 B/op    0 allocs/op
BenchmarkFory_StructList_Deserialize-12         1583568    2456 ns/op      688 B/op    3 allocs/op
BenchmarkFory_Sample_Serialize-12              14559171     247.5 ns/op      0 B/op    0 allocs/op
BenchmarkFory_Sample_Deserialize-12             3322015    1158 ns/op      676 B/op    9 allocs/op
BenchmarkFory_SampleList_Serialize-12            919764    3815 ns/op        0 B/op    0 allocs/op
BenchmarkFory_SampleList_Deserialize-12          240294   20650 ns/op    13952 B/op  163 allocs/op
BenchmarkFory_MediaContent_Serialize-12         7116196     458.0 ns/op      0 B/op    0 allocs/op
BenchmarkFory_MediaContent_Deserialize-12       2302726    1496 ns/op      656 B/op   13 allocs/op
BenchmarkFory_MediaContentList_Serialize-12      532838    7471 ns/op        0 B/op    0 allocs/op
BenchmarkFory_MediaContentList_Deserialize-12    143080   30065 ns/op    13040 B/op  243 allocs/op

this branch:

goos: linux
goarch: amd64
cpu: 12th Gen Intel(R) Core(TM) i5-1235U
BenchmarkFory_Struct_Serialize-12              32973771     121.9 ns/op      0 B/op    0 allocs/op
BenchmarkFory_Struct_Deserialize-12            27100645     163.6 ns/op     32 B/op    1 allocs/op
BenchmarkFory_StructList_Serialize-12           3265543    1103 ns/op        0 B/op    0 allocs/op
BenchmarkFory_StructList_Deserialize-12         1268991    2679 ns/op      688 B/op    3 allocs/op
BenchmarkFory_Sample_Serialize-12              12737720     258.3 ns/op      0 B/op    0 allocs/op
BenchmarkFory_Sample_Deserialize-12             3287950    1123 ns/op      676 B/op    9 allocs/op
BenchmarkFory_SampleList_Serialize-12            995695    3745 ns/op        0 B/op    0 allocs/op
BenchmarkFory_SampleList_Deserialize-12          231688   18862 ns/op    13952 B/op  163 allocs/op
BenchmarkFory_MediaContent_Serialize-12         6446667     542.3 ns/op      0 B/op    0 allocs/op
BenchmarkFory_MediaContent_Deserialize-12       2616025    1222 ns/op      656 B/op   13 allocs/op
BenchmarkFory_MediaContentList_Serialize-12      414025    8415 ns/op        0 B/op    0 allocs/op
BenchmarkFory_MediaContentList_Deserialize-12    172894   28446 ns/op    13040 B/op  243 allocs/op

@ayush00git
Copy link
Contributor Author

@chaokunyang
I'm done with the changes. pasted the benchmarks as well. have a look now


// Create a new stream reader. The stream context handles boundaries and compactions.
streamReader := NewInputStream(stream)
err = f.DeserializeFromStream(streamReader, v)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helper deserializes into the same target twice (first from bytes, then from stream). That can mask stream-path bugs because values populated by the first pass may still be present if the second pass does not fully overwrite fields.

Please deserialize the stream path into a fresh value and compare results, so partial/incorrect stream decoding is detectable.

reader := &slowReader{data: data}
var decoded StreamTestStruct
// Use small minCap (16) to force frequent fills and compactions
f.readCtx.buffer.ResetWithReader(reader, 16)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setup line does not affect the code path under test, because DeserializeFromReader immediately calls ResetWithReader(r, 0) internally and replaces the min-cap you set here.

If the goal is to verify small-cap refill/compaction behavior, consider using DeserializeFromStream with NewInputStreamWithMinCap(..., 16) so the configured capacity is actually used.

}

// NewInputStreamWithMinCap creates a new InputStream with a specified minimum buffer capacity.
func NewInputStreamWithMinCap(r io.Reader, minCap int) *InputStream {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THis is buffer size, not capacity, pelase change API name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll change it to NewInputStreamWithBufferSize. does that sounds good ? It would also resonate with the api implemented in the cpp strem deserialization

Copy link
Collaborator

@chaokunyang chaokunyang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chaokunyang chaokunyang merged commit af6e8b2 into apache:main Mar 6, 2026
58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Go] Streaming Deserialization Support For Go

3 participants